package org.codehaus.activemq.store.jdbc;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.SQLException;
import javax.jms.JMSException;
import javax.sql.DataSource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.WireFormat;
import org.codehaus.activemq.service.impl.PersistenceAdapterSupport;
import org.codehaus.activemq.store.MessageStore;
import org.codehaus.activemq.store.PreparedTransactionStore;
import org.codehaus.activemq.store.TopicMessageStore;
import org.codehaus.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
import org.codehaus.activemq.util.FactoryFinder;
import org.codehaus.activemq.util.JMSExceptionHelper;

public class JDBCPersistenceAdapter extends PersistenceAdapterSupport
{
  private static final Log log = LogFactory.getLog(JDBCPersistenceAdapter.class);
  private static FactoryFinder factoryFinder = new FactoryFinder("META-INF/services/org/codehaus/activemq/store/jdbc/");
  private final WireFormat wireFormat;
  private final DataSource ds;
  private JDBCAdapter adapter;

  public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat)
  {
    this.ds = ds;
    this.wireFormat = wireFormat;
  }

  public MessageStore createQueueMessageStore(String destinationName) throws JMSException {
    if (this.adapter == null) {
      throw new IllegalStateException("Not started");
    }
    return new JDBCMessageStore(this.adapter, this.wireFormat, destinationName);
  }

  public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException {
    if (this.adapter == null) {
      throw new IllegalStateException("Not started");
    }
    return new JDBCTopicMessageStore(this.adapter, this.wireFormat, destinationName);
  }

  public PreparedTransactionStore createPreparedTransactionStore() throws JMSException {
    if (this.adapter == null) {
      throw new IllegalStateException("Not started");
    }
    return new JDBCPreparedTransactionStore(this.adapter, this.wireFormat);
  }

  public void beginTransaction() throws JMSException {
    try {
      Connection c = this.ds.getConnection();
      c.setAutoCommit(false);
      TransactionContext.pushConnection(c);
    }
    catch (SQLException e) {
      throw JMSExceptionHelper.newJMSException("Failed to create transaction: " + e, e);
    }
  }

  public void commitTransaction() throws JMSException {
    Connection c = TransactionContext.popConnection();
    if (c == null)
      log.warn("Commit while no transaction in progress");
    else
      try
      {
        c.commit();
      }
      catch (SQLException e) {
        throw JMSExceptionHelper.newJMSException("Failed to commit transaction: " + c + ": " + e, e);
      }
      finally {
        try {
          c.close();
        }
        catch (Throwable e)
        {
        }
      }
  }

  public void rollbackTransaction() {
    Connection c = TransactionContext.popConnection();
    try {
      c.rollback();
    }
    catch (SQLException e) {
      log.warn("Cannot rollback transaction due to: " + e, e);
    }
    finally {
      try {
        c.close();
      }
      catch (Throwable e)
      {
      }
    }
  }

  public void start() throws JMSException {
    beginTransaction();
    try {
      Connection c = TransactionContext.getConnection();

      this.adapter = null;
      String database = null;
      try {
        database = c.getMetaData().getDriverName();
        database = database.replaceAll(" ", "_");

        log.debug("Database type: [" + database + "]");
        try {
          this.adapter = ((DefaultJDBCAdapter)factoryFinder.newInstance(database));
        }
        catch (Throwable e) {
          log.warn("Unrecognized database type (" + database + ").  Will use default JDBC implementation", e);
        }

      }
      catch (SQLException e1)
      {
      }

      if (this.adapter == null) {
        this.adapter = new DefaultJDBCAdapter();
      }
      try
      {
        this.adapter.doCreateTables(c);
      }
      catch (SQLException e) {
        log.warn("Cannot create tables due to: " + e, e);
      }
      this.adapter.initSequenceGenerator(c);
    }
    finally
    {
      commitTransaction();
    }
  }

  public synchronized void stop()
    throws JMSException
  {
  }
}